package com.shujia.flink.state;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class Demo5ExactlyOnceSInkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
                .setTopics("words")//指定消费的topic
                .setGroupId("my-group")//指定消费者组
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())//指定读取数据的格式
                .build();

        //使用kafka source
        DataStream<String> wordsDS = env
                .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        DataStream<String> filterDS = wordsDS.filter(word -> !"".equals(word));


        Properties properties = new Properties();
        //指定事务超时时间，不能大于15分钟
        properties.setProperty("transaction.timeout.ms", 1000 * 60 * 10 + "");

        //创建kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
                .setKafkaProducerConfig(properties)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("filter")//指定topic
                        .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
                        .build()
                )
                //指定数据处理的语义
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        //使用kafka sink
        filterDS.sinkTo(sink);

        //启动flink
        env.execute();

    }
}
